-
Notifications
You must be signed in to change notification settings - Fork 16
Fixed rate limit issue in bulk publish #2022
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
async function processPendingUnpublishItems(stack, environment, locale, apiVersion) { | ||
// Process assets first | ||
while (pendingAssetItems.length > 0) { | ||
const optimalBatchSize = smartRateLimiter.getOptimalBatchSize(pendingAssetItems.length); | ||
|
||
if (optimalBatchSize === 0) { | ||
// Rate limit exhausted, wait and retry | ||
smartRateLimiter.logStatus(); | ||
await delay(1000); | ||
continue; | ||
} | ||
|
||
// Take the optimal batch size | ||
const batch = pendingAssetItems.splice(0, optimalBatchSize); | ||
|
||
try { | ||
await queue.Enqueue({ | ||
assets: batch, | ||
Type: 'asset', | ||
locale: locale, | ||
environments: [environment], | ||
stack: stack, | ||
apiVersion, | ||
}); | ||
|
||
smartRateLimiter.logStatus(); | ||
|
||
} catch (error) { | ||
if (error.errorCode === 429) { | ||
// Rate limit error, put items back and wait | ||
pendingAssetItems.unshift(...batch); | ||
smartRateLimiter.logStatus(); | ||
await delay(1000); | ||
} else { | ||
// Other error, log and continue | ||
console.log(`Error processing asset batch: ${error.message}`); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements a comprehensive smart rate limiting system for the bulk publish functionality to prevent 429 rate limit errors and improve API request management. The solution introduces proactive rate limiting with intelligent batch sizing and queue management.
- Implements SmartRateLimiter class with singleton pattern per organization for intelligent rate limiting
- Replaces immediate bulk processing with pending item queues and optimal batch sizing
- Adds comprehensive rate limit tracking and proactive delay mechanisms to prevent 429 errors
Reviewed Changes
Copilot reviewed 14 out of 16 changed files in this pull request and generated 4 comments.
Show a summary per file
File | Description |
---|---|
packages/contentstack/package.json | Updates bulk-publish package version to 1.9.1 |
packages/contentstack-utilities/src/contentstack-management-sdk.ts | Adds support for includeResHeaders in request headers |
packages/contentstack-bulk-publish/src/util/smart-rate-limiter.js | New SmartRateLimiter class implementing intelligent rate limiting logic |
packages/contentstack-bulk-publish/src/util/common-utility.js | Adds handleRateLimit utility function for rate limit management |
packages/contentstack-bulk-publish/src/util/client.js | Enables response headers inclusion for rate limit tracking |
packages/contentstack-bulk-publish/src/producer/*.js | Integrates smart rate limiter into publish/unpublish producers |
packages/contentstack-bulk-publish/src/consumer/publish.js | Implements rate limit checking in all publish/unpublish operations |
Files not reviewed (1)
- pnpm-lock.yaml: Language not supported
delete assetobj.stack; | ||
console.log(`Asset unpublished with Asset uid=${assetobj.assetUid}`); | ||
console.log(chalk.red(`Could not Unpublish because of error=${formatError(error)}`)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This success handler is logging an error message with chalk.red instead of a success message with chalk.green. This appears to be copied from an error handler and should log a success message for asset unpublishing.
console.log(chalk.red(`Could not Unpublish because of error=${formatError(error)}`)); | |
console.log(chalk.green(`Successfully unpublished asset with UID=${assetobj.assetUid} from environments=${assetobj.environments.join(', ')}`)); |
Copilot uses AI. Check for mistakes.
addLogs( | ||
logger, | ||
{ options: assetobj, api_key: stack.stackHeaders.api_key, alias: stack.alias, host: stack.host }, | ||
'info', | ||
'error', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This success handler is logging with 'error' level instead of 'info' level. This should be 'info' to match the success case.
'error', | |
'info', |
Copilot uses AI. Check for mistakes.
`Asset UID '${asset.uid}' ${asset.version ? `and version '${asset.version}'` : ''} ${ | ||
asset.locale ? `in locale '${asset.locale}'` : '' | ||
}`, | ||
`Asset UID '${asset.uid}' and version ${versionText} in locale ${localeText}`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The string interpolation includes 'and version ${versionText}' but versionText already contains 'and version' prefix when present, causing duplication like 'and version and version 1'. Should be just Asset UID '${asset.uid}' ${versionText} ${localeText}
.
`Asset UID '${asset.uid}' and version ${versionText} in locale ${localeText}`, | |
`Asset UID '${asset.uid}' ${versionText} ${localeText}`, |
Copilot uses AI. Check for mistakes.
@@ -350,24 +496,40 @@ async function performBulkPublish(data, _config, queue) { | |||
if (bulkPublishObj.apiVersion) { | |||
if (!isNaN(bulkPublishObj.apiVersion) && bulkPublishObj.apiVersion === apiVersionForNRP) { | |||
payload['api_version'] = bulkPublishObj.apiVersion; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is setting publish_with_reference on payload.details, but there's no guarantee that payload.details exists at this point. This could cause a runtime error if payload.details is undefined.
payload['api_version'] = bulkPublishObj.apiVersion; | |
payload['api_version'] = bulkPublishObj.apiVersion; | |
payload.details = payload.details || {}; |
Copilot uses AI. Check for mistakes.
async function processPendingUnpublishItems(stack, environment, locale, apiVersion) { | ||
// Process assets first | ||
while (pendingAssetItems.length > 0) { | ||
const optimalBatchSize = smartRateLimiter.getOptimalBatchSize(pendingAssetItems.length); | ||
|
||
if (optimalBatchSize === 0) { | ||
// Rate limit exhausted, wait and retry | ||
smartRateLimiter.logStatus(); | ||
await delay(1000); | ||
continue; | ||
} | ||
|
||
// Take the optimal batch size | ||
const batch = pendingAssetItems.splice(0, optimalBatchSize); | ||
|
||
try { | ||
await queue.Enqueue({ | ||
assets: batch, | ||
Type: 'asset', | ||
locale: locale, | ||
environments: [environment], | ||
stack: stack, | ||
apiVersion, | ||
}); | ||
|
||
smartRateLimiter.logStatus(); | ||
|
||
} catch (error) { | ||
if (error.errorCode === 429) { | ||
// Rate limit error, put items back and wait | ||
pendingAssetItems.unshift(...batch); | ||
smartRateLimiter.logStatus(); | ||
await delay(1000); | ||
} else { | ||
// Other error, log and continue | ||
console.log(`Error processing asset batch: ${error.message}`); | ||
} | ||
} | ||
} | ||
|
||
// Process entries | ||
while (pendingEntryItems.length > 0) { | ||
const optimalBatchSize = smartRateLimiter.getOptimalBatchSize(pendingEntryItems.length); | ||
|
||
if (optimalBatchSize === 0) { | ||
// Rate limit exhausted, wait and retry | ||
smartRateLimiter.logStatus(); | ||
await delay(1000); | ||
continue; | ||
} | ||
|
||
// Take the optimal batch size | ||
const batch = pendingEntryItems.splice(0, optimalBatchSize); | ||
|
||
try { | ||
await queue.Enqueue({ | ||
entries: batch, | ||
locale: locale, | ||
Type: 'entry', | ||
environments: [environment], | ||
stack: stack, | ||
apiVersion, | ||
}); | ||
|
||
smartRateLimiter.logStatus(); | ||
|
||
} catch (error) { | ||
if (error.errorCode === 429) { | ||
// Rate limit error, put items back and wait | ||
pendingEntryItems.unshift(...batch); | ||
smartRateLimiter.logStatus(); | ||
await delay(1000); | ||
} else { | ||
// Other error, log and continue | ||
console.log(`Error processing entry batch: ${error.message}`); | ||
} | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sunil-lakshman ,
Above code may lead to an infinite loop:-
- If the API continuously responds with HTTP 429 errors and the smartRateLimiter.getOptimalBatchSize() keeps returning 0, the loop will wait 1 second and retry with optimalBatchSize === 0 and repeat forever.
- Same situation if items are again added via unshift and never allowed to dequeue again due to rate limiting.
To Mitigate Infinite Loop you can use these approaches:- maximum retry count & Add exponential backoff instead of fixed 1 sec of delay.
*/ | ||
async function handleRateLimit(error, data, delay, xRateLimitRemaining) { | ||
// Check if rate limit is exhausted or batch size exceeds remaining limit | ||
if (xRateLimitRemaining === 0 || data.length > xRateLimitRemaining) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sunil-lakshman , try to use optional chaining because we don't know the type of data property. Also what if xRateLimitRemaining will be undefined ?
// cliux.print( | ||
// `[${this.requestCount}] ${operation.toUpperCase()} ${itemType}: ${itemId} (${this.xRateLimitRemaining} remaining)`, | ||
// { color: 'cyan' } | ||
// ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sunil-lakshman , remove commented code.
// cliux.print( | ||
// `[${this.requestCount}] ✓ ${operation.toUpperCase()} ${itemType}: ${itemId} - SUCCESS`, | ||
// { color: 'green' } | ||
// ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sunil-lakshman , remove commented code.
// cliux.print( | ||
// `[${this.requestCount}] ✗ ${operation.toUpperCase()} ${itemType}: ${itemId} - FAILED (${errorCode})`, | ||
// { color: 'red' } | ||
// ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sunil-lakshman , remove commented code.
const errorCode = error.errorCode || 'UNKNOWN'; | ||
const errorMessage = error.message || error.error_message || 'Unknown error'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sunil-lakshman , try to use optional chaining as we aren't sure about type of error.
@@ -3817,8 +3820,7 @@ USAGE | |||
$ csdx launch:functions [-p <value>] [-d <value>] | |||
|
|||
FLAGS | |||
-d, --data-dir=<value> [default: /Users/aman.kumar/Documents/cli-repos/logger-v2/cli/packages/contentstack] Current | |||
working directory | |||
-d, --data-dir=<value> [default: /Users/sunil.lakshman/Documents/cli/packages/contentstack] Current working directory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sunil-lakshman Do not use your local path; use '/' instead.
@@ -1,7 +1,7 @@ | |||
{ | |||
"name": "@contentstack/cli-cm-bulk-publish", | |||
"description": "Contentstack CLI plugin for bulk publish actions", | |||
"version": "1.9.0", | |||
"version": "1.9.1", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sunil-lakshman This is not a small fix, consider bumping this to a minor version instead a patch version bump.
Fixed rate limit issue in bulk publish.
✅ Created SmartRateLimiter class with intelligent rate limiting logic
✅ Implemented singleton pattern using Map to ensure one instance per organization
✅ Added proactive rate limiting to prevent 429 errors before they occur
2. Rate Limit State Management
✅ Added serverRateLimit tracking to store actual server limit from x-ratelimit-limit header
✅ Improved xRateLimitRemaining updates from server responses
✅ Added request counter to track total requests made
✅ Implemented lastStatusLog tracking to prevent duplicate status logs
3. Proactive Rate Limiting Features
✅ Low rate limit detection: Forces delays when xRateLimitRemaining <= 2
✅ Conservative batch sizing:
1 item at a time when rate limit ≤ 2
Max 2 items when rate limit ≤ 5
✅ Intelligent waiting: Prevents hitting zero rate limit
4. Queue Management
✅ Smart re-enqueuing: Items are re-added to queue after rate limit delays
✅ Error handling: 429 errors trigger delays and re-enqueuing
✅ Preservation of items: No data loss during rate limiting
5. Batch Size Optimization
✅ Dynamic batch sizing: Adapts to current rate limit
✅ Conservative approach: Smaller batches when rate limit is low
✅ Optimal processing: Balances speed with rate limit compliance
6. Error Prevention
✅ Prevents 429 errors: Proactive delays when rate limit is low
✅ Graceful degradation: Continues processing even with rate limits
✅ Recovery mechanism: Automatically resumes after delays